Skip to content

Conversation

@YannByron
Copy link
Contributor

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@YannByron
Copy link
Contributor Author

@leesf @xushiyan
please review this if have a chance.
This has been verified locally. And the test reports have submitted in this ticket.

@alexeykudinkin
Copy link
Contributor

@YannByron can you please add PR description, so that it's a little more clear what we're trying to tackle here?

@YannByron
Copy link
Contributor Author

@YannByron can you please add PR description, so that it's a little more clear what we're trying to tackle here?

Sure.

When upgrade to spark3.2, the TestMORDataSource.testCount UT will fail to to assert hudiIncDF4SkipMerge.count() is equal with 200. The result is 100. That's because can not read data in parquet file if requiredStructSchema is empty. I think its a problem about parquet 1.12.1.
But I can pass the full schema to the reader of parquet, and extract the required schema outside to solve this.

TestMORDataSource.testPrunedFiltered also fails due to use DefaultHoodieRecordPayload. Both PAYLOAD_ORDERING_FIELD_PROP_KEY and PAYLOAD_EVENT_TIME_FIELD_PROP_KEY use ts as the default value and are never changed along with preCombineField, that cause a failure or not to update values with a latest precombine value.
So I set the current preCombineField to PAYLOAD_ORDERING_FIELD_PROP_KEY and PAYLOAD_EVENT_TIME_FIELD_PROP_KEY .
Why this case can success in Spark with version < 3.2, it's so wried. But these changes work in all Spark versions.

Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
val properties = HoodiePayloadConfig.newBuilder
.withPayloadOrderingField(preCombineField.get)
.withPayloadEventTimeField(preCombineField.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bug if not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think yes.
If not set, upsert may get a wrong result if use DefaultHoodieRecordPaylod. And in spark3.2, if no ordering.field or event.time.field found, setting returnNullIfNotFound to true does not take effect when call HoodieAvroUtils.getNestedFieldVal, like #4169.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting returnNullIfNotFound to true does not take effect when call HoodieAvroUtils.getNestedFieldVal

this looks like a bug in getNestedFieldVal() ? is it fixable in spark 3.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not. But I also have no idea about the exception when set returnNullIfNotFound true and call getNestedFieldVal. But if PAYLOAD_ORDERING_FIELD_PROP_KEY is set automatically when preCombineField is set, which we should have done this, it works well.

Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
val properties = HoodiePayloadConfig.newBuilder
.withPayloadOrderingField(preCombineField.get)
.withPayloadEventTimeField(preCombineField.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting returnNullIfNotFound to true does not take effect when call HoodieAvroUtils.getNestedFieldVal

this looks like a bug in getNestedFieldVal() ? is it fixable in spark 3.2?


private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i should remove this.

rows
}

private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name could imply returning schema

Suggested change
private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
private def toRowsWithRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {

Comment on lines 737 to 741
// use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY and PAYLOAD_EVENT_TIME_FIELD_PROP_KEY
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
mergedParams.put(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expected scenario should be: when preCombineField is set, PAYLOAD_ORDERING_FIELD_PROP_KEY is set automatically, and PAYLOAD_EVENT_TIME_FIELD_PROP_KEY is never required. It'll be good to fix these along the way.

Copy link
Contributor Author

@YannByron YannByron Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I see it didn't do that and I don't know how can run correctly before.

btw, why PAYLOAD_EVENT_TIME_FIELD_PROP_KEY is never required. After all, DefaultHoodieRecordPayload will use this for now.

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Jan 17, 2022
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron let me clarify my thoughts:

  • event time field can be different from ordering / precombine field: the former is used to calculate latency/freshness; it can be another field say occurred_at and the latency will be calculated as commit_time - occurred_at, while the records are ordered or precombined using updated_at
  • there are users who need to set event time and there are those who don't care. For the latter, we can't just set it as the ordering field; we should leave it not set and then nothing will be calculated or persisted in commit metadata
  • as you reported "setting returnNullIfNotFound to true does not take effect", then we should fix that instead of masking it by setting event time

WDYT?

@YannByron
Copy link
Contributor Author

@xushiyan very appreciate for your explanation. I'll restore the origin logic.

@YannByron
Copy link
Contributor Author

@xushiyan what's the difference between ordering.field and precombine.field?

@YannByron
Copy link
Contributor Author

@YannByron let me clarify my thoughts:

  • event time field can be different from ordering / precombine field: the former is used to calculate latency/freshness; it can be another field say occurred_at and the latency will be calculated as commit_time - occurred_at, while the records are ordered or precombined using updated_at
  • there are users who need to set event time and there are those who don't care. For the latter, we can't just set it as the ordering field; we should leave it not set and then nothing will be calculated or persisted in commit metadata
  • as you reported "setting returnNullIfNotFound to true does not take effect", then we should fix that instead of masking it by setting event time

WDYT?

@xushiyan I remove the codes about setting event.time by precombine.field, but I keep setting ordering.field by precombine.field, is it right?
And, for "setting returnNullIfNotFound to true does not take effect", I can't fix it soon. So I just catch the exception in this case to make it work.
please review again.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan
Copy link
Member

xushiyan commented Jan 26, 2022

@YannByron let me clarify my thoughts:

  • event time field can be different from ordering / precombine field: the former is used to calculate latency/freshness; it can be another field say occurred_at and the latency will be calculated as commit_time - occurred_at, while the records are ordered or precombined using updated_at
  • there are users who need to set event time and there are those who don't care. For the latter, we can't just set it as the ordering field; we should leave it not set and then nothing will be calculated or persisted in commit metadata
  • as you reported "setting returnNullIfNotFound to true does not take effect", then we should fix that instead of masking it by setting event time

WDYT?

@xushiyan I remove the codes about setting event.time by precombine.field, but I keep setting ordering.field by precombine.field, is it right? And, for "setting returnNullIfNotFound to true does not take effect", I can't fix it soon. So I just catch the exception in this case to make it work. please review again.

@YannByron yes we should tackle returnNullIfNotFound as a follow-up issue and PR. Filed https://issues.apache.org/jira/browse/HUDI-3333

@xushiyan xushiyan merged commit 4a9f826 into apache:master Jan 26, 2022
@xushiyan xushiyan self-assigned this Jan 26, 2022
liusenhua pushed a commit to liusenhua/hudi that referenced this pull request Mar 1, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants